package com.xlongwei.logserver;

import java.io.File;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

import org.apache.commons.io.FilenameUtils;

import com.networknt.utility.StringUtils;

import io.undertow.Handlers;
import io.undertow.server.handlers.sse.ServerSentEventConnection;
import io.undertow.server.handlers.sse.ServerSentEventConnectionCallback;
import io.undertow.server.handlers.sse.ServerSentEventHandler;
import io.undertow.websockets.WebSocketConnectionCallback;
import io.undertow.websockets.core.WebSocketChannel;
import io.undertow.websockets.core.WebSockets;
import io.undertow.websockets.spi.WebSocketHttpExchange;

/**
 * 跟踪最新日志，使用WebSocket实时通知浏览器
 * @author xlongwei
 *
 */
public class TailCallback implements WebSocketConnectionCallback {
	public static ServerSentEventHandler sse = Handlers.serverSentEvents(new ServerSentEventConnectionCallback() {
		@Override
		public void connected(ServerSentEventConnection connection, String lastEventId) {
			if(logs.exists()) {
				String tail = ExecUtil.tail(FilenameUtils.getName(ExecUtil.logs), 100);
				connection.send(tail);
			}
		}
	});
	private static WebSocketChannel channel;
	private static File logs = new File(ExecUtil.logs);
	private static BlockingQueue<String> notifyQueue = new LinkedBlockingDeque<>();
	private static boolean notifyQueueStarted = false;
	
	public static void notify(String txt) {
		if(StringUtils.isBlank(txt) || channel==null && sse.getConnections().isEmpty()) return;
		boolean offer = notifyQueue.offer(txt);
		if(offer && !notifyQueueStarted) {
			notifyQueueStarted = true;
			PageHandler.scheduler.submit(() -> {
				while(true) {
					String notify = notifyQueue.take();
					Set<WebSocketChannel> peerConnections = channel == null ? Collections.emptySet() : channel.getPeerConnections();
					for(WebSocketChannel connection : peerConnections) {
						if(connection.isOpen()) {
							WebSockets.sendText(notify, connection, null);
						}
					}
					sse.getConnections().forEach(connection -> {
						if(connection.isOpen()) {
							connection.send(notify);
						}
					});
				}
			});
		}
	}
	
	@Override
	public void onConnect(WebSocketHttpExchange exchange, WebSocketChannel channel) {
		if(logs.exists()) {
			String tail = ExecUtil.tail(FilenameUtils.getName(ExecUtil.logs), 100);
			WebSockets.sendText(tail, channel, null);
		}
		TailCallback.channel = channel;
	}
	
}
